package com.hg.transaction;

import com.hg.MqConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * created by skh on 2019/12/15
 */
@Slf4j
public class TransactionConsumer {

	public static void main(String[] args) throws MQClientException {

		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
		consumer.setNamesrvAddr(MqConfig.nameServerAddr);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		consumer.subscribe("TopicTransaction", "*");
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			private Random random = new Random();

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				for (MessageExt msg : msgs) {
					log.info("接收到消息:{}", msg);
					log.info("接收到消息标签:{}", msg.getTags());
					try {
						log.info("接收到消息内容:{}", new String(msg.getBody(),"UTF-8"));
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				}

				try {
					TimeUnit.SECONDS.sleep(random.nextInt(5));
				} catch (InterruptedException e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

			}

		});
		consumer.start();
		log.info("消费者启动成功");
	}
}
